package com.ld.shieldsb.canalclient.client;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.MDC;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.ld.shieldsb.canalclient.handler.ICanalDataSyncHandler;
import com.ld.shieldsb.canalclient.model.CommonMessage;
import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.util.MessageUtil;
import com.ld.shieldsb.common.core.collections.ListUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 客户端抽象类
 * 
 * @ClassName AbstractCanalClient
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2021年12月2日 下午6:34:10
 *
 */
@Slf4j
public class AbstractCanalClient extends BaseCanalClient {
    private int batchSize = 1000; // 批量处理每次处理的条数

    public AbstractCanalClient(String destination, CanalConnector connector) {
        this.destination = destination;
        this.connector = connector;
    }

    public void start() {
        if (isRunning()) {
            log.error("已经启动，无需重复启动！");
            return;
        } else {
            // 启动记录器
            if (getRecorderHandler() != null) {
                getRecorderHandler().start();
            }
            // 启动处理器

            if (connector == null) {
                log.error("connector 为空，不执行！");
                return;
            }
            thread = new Thread(this::process);

            thread.setUncaughtExceptionHandler(handler);
            running = true;
            thread.start();
        }
    }

    public void stop() {
        if (!running) {
            return;
        }
        // 关闭处理器，释放资源
        if (ListUtils.isNotEmpty(handlers)) {
            handlers.stream().forEach(ICanalDataSyncHandler::destroy);
        }
        if (getRecorderHandler() != null) {
            getRecorderHandler().stop();
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                log.error("", e);
                Thread.currentThread().interrupt();
            }
        }

        MDC.remove("destination");
    }

    protected void process() {
        int batchSize = 5 * 1024;

        while (running) {

            try {
                switchGet(); // 获取开关是否打开，未打开则阻塞等待
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe();
                while (running) {
                    try {
                        switchGet(30L, TimeUnit.MINUTES); // 等待30分钟
                    } catch (TimeoutException e) {
                        break;
                    }
                    if (!running) { // 可能取到的时候已经设置为不可运行了，所以再取一次
                        break;
                    }
                    setRunningState(RUNNING_SUCCESS); // 到这一步说明能正常取到数据了，将运行状态设为正常

                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // 没有合适的消息不处理
                    } else {

                        // 进行数据同步
                        // 1. 解析Message对象
                        List<CommonMessage> commonMessages = MessageUtil.convert(message);
                        List<Dml> dmls = MessageUtil.flatMessage2Dml(destination, null, commonMessages);

                        if (ListUtils.isNotEmpty(dmls)) {
                            // 记录Dml信息
                            if (getRecorder() != null) {
                                dmls.stream().forEach(dml -> {
                                    try {
                                        getRecorder().recordDml(dml); // 记录Dml，注意一定要获取，否则队列满了之后就阻塞了
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                });
                            }
                        }
                        if (ListUtils.isNotEmpty(handlers) && ListUtils.isNotEmpty(dmls)) {
                            // 现在是串行执行的，后面可以改为并行执行，不同组并行，同一个组串行
                            handlers.stream().forEach(hander -> batchSync(dmls, hander));
                        }

                    }

                    if (batchId != -1) {
                        connector.ack(batchId); // 提交确认
                    }
                }
            } catch (Throwable e) {
                log.error("canal异常！！！", e);
                try {
                    if (e.getMessage().contains("should start first")) { // ailed to subscribe with reason: something goes wrong with
                                                                         // channel:[id: 0x3bb7c55d, /192.168.1.211:6572 =>
                                                                         // /192.168.1.248:11111],
                                                                         // exception=com.alibaba.otter.canal.server.exception.CanalServerException:
                                                                         // destination:mysql2451 should start first
                        Thread.sleep(60 * 1000L); // 1分钟后重试,实例未启动
                    } else {
                        Thread.sleep(10 * 1000L); // 延后再试
                    }
                } catch (InterruptedException e1) {
                    log.error("", e);
                    Thread.currentThread().interrupt();
                }

                try {
                    connector.rollback(); // 处理失败, 回滚数据
                } catch (Exception e1) {
                    // do nothing
                }
            } finally {
                connector.disconnect();
                setRunningState(RUNNING_ERROR); // 将运行状态设为异常
                MDC.remove("destination");
            }
        }
    }

    /**
     * 分批同步
     *
     * @param dmls
     * @param adapter
     */
    private void batchSync(List<Dml> dmls, ICanalDataSyncHandler adapter) {
        // 分批同步
        if (dmls.size() <= batchSize) {
            adapter.handle(dmls, getRecorder());
        } else { // 超过数量，可以优化
            for (List<Dml> dmlsBatch : ListUtils.splitList2Group(dmls, batchSize)) {
                adapter.handle(dmlsBatch, getRecorder());
            }
        }
    }

    public int getBatchSize() {
        return batchSize;
    }

    /**
     * 设置每次批量处理的数量
     * 
     * @Title setBatchSize
     * @author 吕凯
     * @date 2021年12月7日 上午11:25:22
     * @param batchSize
     *            void
     */
    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

}
